package com.baomidou.springboot.spark;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.springboot.constant.Constants;
import com.baomidou.springboot.entity.JsonData;
import com.baomidou.springboot.entity.SessionAggrStat;
import com.baomidou.springboot.entity.Task;
import com.baomidou.springboot.service.JsonDataService;
import com.baomidou.springboot.service.SessionAggrStatService;
import com.baomidou.springboot.service.TaskService;
import com.baomidou.springboot.utils.*;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import parquet.it.unimi.dsi.fastutil.ints.IntList;

import javax.annotation.PostConstruct;

import java.util.Map;

import static com.baomidou.springboot.spark.PageViewRatio.*;
import static com.baomidou.springboot.spark.SessionAnalysis.*;

/**
 * Created by pcmmm on 17/6/8.
 */
@Component
public class AnalysisMain {

    @Autowired
    TaskService taskService;
    @Autowired
    SessionAggrStatService sessionAggrStatService;
    @Autowired
    JsonDataService jsonDataService;
    protected final static Logger logger = LoggerFactory.getLogger(AnalysisMain.class);

    private static AnalysisMain serviceUtils;


    @PostConstruct
    public void init() {
        serviceUtils = this;
        serviceUtils.jsonDataService =this.jsonDataService;
        serviceUtils.sessionAggrStatService = this.sessionAggrStatService;
    }
    public static void Analysis1(Task task) {
        // 构建Spark上下文
        SparkConf conf = new SparkConf()
                .setAppName(Constants.SPARK_APP_NAME_SESSION)
                .setMaster("local[4]")
                .set("spark.storage.memoryFraction", "0.5")
                .set("spark.shuffle.consolidateFiles", "true")
                .set("spark.shuffle.file.buffer", "64")
                .set("spark.shuffle.memoryFraction", "0.3")
                .set("spark.reducer.maxSizeInFlight", "24")
                .set("spark.shuffle.io.maxRetries", "60")
                .set("spark.shuffle.io.retryWait", "60")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .registerKryoClasses(new Class[]{
                        CategorySortKey.class,
                        IntList.class});
        SparkUtils.setMaster(conf);
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = SparkUtils.getSQLContext(sc.sc());

        //加载user_action和user_info表
        SparkUtils.loadLocalTestDataToTmpTable(sc,sqlContext);
        JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());

        //进行session粒度的聚合，获取指定日期范围内的session
        JavaRDD<Row> actionRDD = SparkUtils.getActionRDDByDateRange(sqlContext, taskParam);

        //logger.info("actionRDD:"+actionRDD.take(1).toString());

        // 对用户访问行为数据做一个映射，将其映射为<sessionid,访问行为>的格式
        // 咱们的用户访问页面切片的生成，是要基于每个session的访问数据，来进行生成的
        // 脱离了session，生成的页面访问切片，是么有意义的
        // 举例，比如用户A，访问了页面3和页面5
        // 用户B，访问了页面4和页面6
        // 漏了一个前提，使用者指定的页面流筛选条件，比如页面3->页面4->页面7
        // 你能不能说，是将页面3->页面4，串起来，作为一个页面切片，来进行统计呢
        // 当然不行
        // 所以说呢，页面切片的生成，肯定是要基于用户session粒度的

        //将actionRDD映射为key为Sessionid的PairRDD
        JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);

        //logger.info("sessionid2actionRDD:"+sessionid2actionRDD.take(1).toString());
        //持久化
        sessionid2actionRDD = sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY());

        /*// 对<sessionid,访问行为> RDD，做一次groupByKey操作
        // 因为我们要拿到每个session对应的访问行为数据，才能够去生成切片
        JavaPairRDD<String, Iterable<Row>> sessionid2actionsRDD = sessionid2actionRDD.groupByKey();

        // 最核心的一步，每个session的单跳页面切片的生成，以及页面流的匹配，算法
        JavaPairRDD<String, Integer> pageSplitRDD = generateAndMatchPageSplit(
                sc, sessionid2actionsRDD, taskParam);
        Map<String, Object> pageSplitPvMap = pageSplitRDD.countByKey();
        // 使用者指定的页面流是3,2,5,8,6
        // 咱们现在拿到的这个pageSplitPvMap，3->2，2->5，5->8，8->6
        long startPagePv = getStartPagePv(taskParam, sessionid2actionsRDD);

        // 计算目标页面流的各个页面切片的转化率
        Map<String, Double> convertRateMap = computePageSplitConvertRate(
                taskParam, pageSplitPvMap, startPagePv);

        // 持久化页面切片转化率
        persistConvertRate(task.getTaskid(), convertRateMap);*/



        //与user信息join得到以sessionid为key的pairRDD
        // 数据格式为<sessionid,(sessionid,searchKeywords,clickCategoryIds,age,professional,city,sex)>
        JavaPairRDD<String, String> sessionid2AggrInfoRDD =
                aggregateBySession(sc, sqlContext, sessionid2actionRDD);
        ////sessionid2AggrInfoRDD:[(9c9d32b01ae2434a802994c73c32e119
        // ,sessionid=9c9d32b01ae2434a802994c73c32e119|searchKeywords=null,iPhone,OPPO|clickCategoryIds=31|visitLength=2383|stepLength=9|startTime=2017-06-07 17:05:52|age=20|professional=公务员|city=南京|sex=female)]
        //logger.info("sessionid2AggrInfoRDD:"+sessionid2AggrInfoRDD.take(1).toString());

        //初始化seesionStat累加器
        Accumulator<String> sessionAggrStatAccumulator = sc.accumulator(
                "", new SessionAggrStatAccumulator());
        //Accumulator<String> cityAccumulator = sc.accumulator("",new CityAccumulator());
        //Accumulator<String> timeAccumulator = sc.accumulator("",new TimeAccumulator());
        // 对session粒度的聚合数据，按照使用者指定的筛选参数进行数据过滤 并累加相关数据
        JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD = filterSessionAndAggrStat(
                sessionid2AggrInfoRDD, taskParam, sessionAggrStatAccumulator);
        //action操作
        long session_count = filteredSessionid2AggrInfoRDD.count();
        System.out.println("满足条件的session个数："+ session_count);
        //filteredSessionid2AggrInfoRDD = filteredSessionid2AggrInfoRDD.persist(StorageLevel.MEMORY_ONLY());

        //filteredSessionid2AggrInfoRDD:[(6b9d0ad3f61044a99bdffe47b89a566d
        // ,sessionid=6b9d0ad3f61044a99bdffe47b89a566d|searchKeywords=一加,null,VIVO,锤子,三星,小米|clickCategoryIds=7,21,22,63,93,39|visitLength=3328|stepLength=26|startTime=2017-06-07 20:03:04|age=50|professional=金融|city=广州|sex=male)]
        logger.info("filteredSessionid2AggrInfoRDD:"+filteredSessionid2AggrInfoRDD.take(1).toString());
        //System.out.println(cityAccumulator.value());
        //System.out.println(sessionAggrStatAccumulator.value());
        // 通过筛选的session对应的访问明细数据

        //JavaPairRDD<String, Row> sessionid2detailRDD = getSessionid2detailRDD(
        //        filteredSessionid2AggrInfoRDD, sessionid2actionRDD);
        //sessionid2detailRDD = sessionid2detailRDD.persist(StorageLevel.MEMORY_ONLY());

        //sessionid2detailRDD:[(ec9c31033ce240e1b23e54b80e9abafe,[ec9c31033ce240e1b23e54b80e9abafe,42,2017-06-07,7,2017-06-07 21:35:53,华为,null,null,null,null,null,null])]
        //logger.info("sessionid2detailRDD:"+sessionid2detailRDD.take(1).toString());



        // session时长、步长占比持久化
        SessionAggrStat sessionAggrStat = calculateAggrStat(sessionAggrStatAccumulator.value(),
                task.getTaskid());
        String ratio_data = JSON.toJSONString(sessionAggrStat);
        JsonData sessionData = new JsonData();
        sessionData.setTaskid(sessionAggrStat.getTaskid());
        sessionData.setData(ratio_data);
        sessionData.setTaskType("0");
        serviceUtils.jsonDataService.insert(sessionData);

        //计算city-value
        String cityJson = calulateCityRatio(sessionAggrStatAccumulator.value());
        JsonData cityData = new JsonData();
        cityData.setTaskType("1");
        cityData.setData(cityJson);
        cityData.setTaskid(task.getTaskid());
        serviceUtils.jsonDataService.insert(cityData);

        //计算time-value
        String timeJson = calulateTimeRatio(sessionAggrStatAccumulator.value());
        System.out.println(timeJson);
        JsonData timeData = new JsonData();
        timeData.setTaskType("2");
        timeData.setData(timeJson);
        timeData.setTaskid(task.getTaskid());
        serviceUtils.jsonDataService.insert(timeData);
        sc.stop();
    }



}
